🕺Kapoeira💃

kapoeira

Presentation

kara
Mehdi Rebiai

Presentation

odile
Johanna Vauchel

Presentation

darmon
François Barbe

Opensource at Lectra

Lectra

lectra 4.0

Opensource

  • Use Foss projects to build our solutions

    • define a Foss policy to manage this usage

  • Use Innersource projects

    • incubator for futur Opensource projects

  • Contributor and creator of Opensource projects

Take Away 🎁

  • Discover a new tool to test your kafka streams

  • Help you in your communication with PO/QA/DEV

  • Tips to use it every day

  • Have a good time (we hope)

📽️ Kapoeira story 🎬

cine

Vector

vector

  • Thousands of cutters

  • Millions of events every day

Enrich and collect data

enrichData

Data pipeline

data pipeline

We are perfect !

perfect

Data is perfect !

pipeline example

NO !

no not

NO !

pipeline example poo

Solution ?

TESTS OUR STREAMS!

How to test ?

fast

Fast and efficient…​

Scala Test Example

package com.lectra.kafka.stream.example

import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
import org.apache.kafka.streams._
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, GivenWhenThen}

import java.io.File
import java.util.UUID

class KafkaStreamSelectKeyTest extends AnyFlatSpec with Matchers with BeforeAndAfterEach with BeforeAndAfterAll with GivenWhenThen {

  private val stringSerializer = new StringSerializer()
  private val stringDeserializer = new StringDeserializer()

  private var driver: TopologyTestDriver = _
  private var inputTopic: TestInputTopic[String, String] = _
  private var outputTopic: TestOutputTopic[String, String] = _

  private def tempDir: File = {
    val ioDir = System.getProperty("java.io.tmpdir")
    val f = new File(ioDir, "kafka-" + UUID.randomUUID().toString)
    f.mkdirs()
    f.deleteOnExit()
    f
  }

  private def buildTopology(): Topology = {
    import org.apache.kafka.streams.scala.StreamsBuilder
    val builder = new StreamsBuilder
    KafkaStreamSelectKey.topology(builder)
    builder.build()
  }

  override def beforeEach(): Unit = {
    KafkaStreamAvro.config.put(StreamsConfig.STATE_DIR_CONFIG, tempDir.getAbsolutePath)
    driver = new TopologyTestDriver(buildTopology(), KafkaStreamSelectKey.config)
    inputTopic = driver.createInputTopic(KafkaStreamSelectKey.topicIn, stringSerializer, stringSerializer)
    outputTopic = driver.createOutputTopic(KafkaStreamSelectKey.topicChangedKey, stringDeserializer, stringDeserializer)
  }

  override def afterEach(): Unit = {
    driver.close()
  }


  "Nominal case for select" should "change the key of records by combining key and value with -" in {
    val key = "mykey"
    val value = "myvalue"
    val key2 = "yourkey"
    val value2 = "yourvalue"

    inputTopic.pipeInput(key, value)
    inputTopic.pipeInput(key2, value2)
    val expectedKey1 = s"$key-$value"
    val expectedKey2 = s"$key2-$value2"

    outputTopic.getQueueSize shouldBe 2
    outputTopic.readKeyValue() shouldBe new KeyValue(expectedKey1, value)
    outputTopic.readKeyValue() shouldBe new KeyValue(expectedKey2, value2)

  }


}

Happy 😀…​🤮

content vomi

…​ But it’s a mocked infrastructure

fail

It did not test the integration with the Kafka cluster

…​ And only unit tests

end to end

How to test several streams ?

…​ And it’s not for QA (=👮)

les nuls police

…​ And it’s technical code

simon kara

How to communicate with DEV/PO/QA ?

What do we need ?

test pyramid cesar triangle

Integration tests with a simple syntax

Inspiration : Karate

karate

Our context ≠ HTTP

kafkalogo

Integration with Kafka Streams !

What is Kapoeira?

Cucumber Scala using specific Gherkin DSL

cucumber

What is Gherkin ?

Feature: A calculator example

  Scenario: The sum example
    Given a = 2
    And b = 3
    When a + b
    Then result == 5

  Scenario: The mult example
    Given a = 2
    And b = 3
    When a * b
    Then result == 6

What is Cucumber ?

class StepDefinitions {
  var variables = collection.mutable.Map[String,Long]()
  var result: Long = 0

  @Given("^\\s*([a-zA-Z]+)\\s*=\\s*(\\d+)\\s*$")
  def addVariable(name: String, value: Long) : Unit = {
    variables = variables ++ (name , value)
  }

  @When("^\\s*([a-zA-Z]+)\\s*\\+\\s*([a-zA-Z]+)\\s*$")
  def sum(left: String, right: String): Unit = {
    val leftValue = variables.get(left)
    val rightValue = variables.get(right)
    assertNotNull(leftValue, "Unknown variable " + left)
    assertNotNull(rightValue, "Unknown variable " + right)
    result = leftValue + rightValue
  }

  @When("^\\s*([a-zA-Z]+)\\s*\\*\\s*([a-zA-Z]+)\\s*$")
  def mult(left: String, right: String): Unit = {
    val leftValue = variables.get(left)
    val rightValue = variables.get(right)
    assertNotNull(leftValue, "Unknown variable " + left)
    assertNotNull(rightValue, "Unknown variable " + right)
    result = leftValue * rightValue
  }

  @Then("^\\s*result\\s*==\\s*(\\d+)\\s*$")
  def result(expectedResult: Long) = {
    assertEquals(expectedResult, result)
  }
}

How does it work ?

archi

How does it work ?

kapoeira diagram

2020 - Birth of Kapoeira

young chabat
  • Inner Source @Lectra

  • First syntax created with a QA

  • Backend calling Confluent CLI

CLI…​

# Console producer
kafka-console-producer \
  --topic orders \
  --bootstrap-server broker:9092 \

# Console consumer
kafka-console-consumer \
  --topic orders \
  --bootstrap-server broker:9092 \
  --from-beginning

2020 - Custom backend

scala
  • Specific Scala implementation for Kafka Consumer/Producer

  • Better syntax with Gherkin Datatable

2021 - ZIO

zio

  • To improve perfs

  • Add parallel mode

  • Batch to manage jointure in topics

2023 - Open Source

2024 - New features

simon

Thanks to you !

Demo

buger quiz
rapport

User Story n°1

As a 🧑‍🍳, WHEN I send a 🥔 to my robot, THEN I expect to have 🍟

fries-factory

fries factory

User Story n°2

As a 🧑‍🍳, WHEN I send 🥔🥔🥔 to my robot, THEN I expect to have 🍟🍟🍟

User Story n°3

As a 🧑‍🍳, WHEN I send 🥔 to my robot, with KETCHUP(🍅) header request, THEN I expect to have a 🍟 with KETCHUP(🍅).

User Story n°4

As a 🧑‍🍳, WHEN I send the ingredients (🍞,🍅,🥩) in dedicated robots, THEN I expect to have a menu 🍔

burger-factory

burger factory

meal-factory chewing gum

meal factory

burger-quiz

burger quiz

🥦 REX 🥃

  • 👐 Big community in Lectra

  • 🤝 Used as acceptance tests, specifications during story grooming

  • ✏️ Easy for QA to enrich existing tests

  • 🔄 Used as end-to-end tests

Advantages 💪

advantages

  • Kafka infra

  • Simple to use

  • Communicate with PO/QA/DEV

  • Tests as documentation

  • Tests as acceptance for stories

Want to use it ? 👩‍🏭

banco

How to build ? 🔨

docker build -t kapoeira:latest .

How to use ? ⚒️

docker run --rm -ti \
-v <PATH_TO_YOUR_FEATURES_FOLDER>:/features \
-v /var/run/docker.sock:/var/run/docker.sock \
-e KAFKA_BOOTSTRAP_SERVER=<HOST:PORT[,HOST2:PORT2,HOST3:PORT3,...]> \
-e KAFKA_SCHEMA_REGISTRY_URL=<URL> \
-e KAFKA_USER=<XXX> \
-e KAFKA_PASSWORD=<****> \
-e JAAS_AUTHENT=<true (default) | false> \
-e LOGGING_LEVEL=<INFO (default) | ERROR | ...> \
-e THREADS=<8 (default) | ... > \
lectratech/kapoeira

How to contribute ? 💵

TODO

Thank you !

kapoeira

Thanks for your feedback